/*
 *  Copyright (C) 2004 Cidero, Inc.
 *
 *  Permission is hereby granted to any person obtaining a copy of 
 *  this software to use, copy, modify, merge, publish, and distribute
 *  the software for any non-commercial purpose, subject to the
 *  following conditions:
 *  
 *  The above copyright notice and this permission notice shall be included
 *  in all copies or substantial portions of the Software.
 *
 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 
 *  OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 *  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 
 *  THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 *  LIABILITY IN CONNECTION WITH THE SOFTWARE.
 * 
 *  File: $RCSfile: ByteBufferQueue.java,v $
 *
 */

package com.cidero.util;

import java.nio.*;
import java.io.IOException;
import java.util.LinkedList;

/*
 *  Queue for passing ByteBuffer objects between threads in a synchronized
 *  manner. The queue maintains a 'pool' of ByteBuffer objects to keep
 *  overhead due to object creation & garbage collection low.
 *
 *  Buffers are managed internally as a stack - the idea is to reuse the 
 *  most recently used buffers as opposed to cycling through all the 
 *  buffers (better cache performance)
 *
 *  A typical example of producer/consumer usage:
 *
 *   // Set up queue, specifying the max size of each packet and the 
 *   // number of packets
 *
 *   ByteBufferQueue queue = new ByteBufferQueue( bufSize, maxBufs );
 *
 *   //-------------------------------------------------------------
 *   // Writer thread
 *   //-------------------------------------------------------------
 *
 *   while( true )
 *   {
 *     // Get a free buffer
 *     ByteBuffer qBuf = queue.alloc( timeoutMilli );
 *      
 *     // Fill it with data from external byte array
 *     qBuf.put( writeByteArray, 0, writeByteArray.length );
 *
 *     // Put buffer on queue
 *     queue.put( qBuf, timeoutMilli );
 *   
 *   }
 *
 *   //-------------------------------------------------------------
 *   // Reader thread
 *   //-------------------------------------------------------------
 *
 *   while( true )
 *   {
 *     // Get a full buffer
 *     ByteBuffer qbuf;
 *     while( (qBuf = queue.get( timeoutMilli )) != null )
 *       System.out.println("Timeout");
 *      
 *     // Extract data into local buffer (assuming limit < readByteArry.length)
 *     qBuf.get( readByteArray, 0, qBuf.limit() );
 *
 *     // Free buffer back to queue when done
 *     queue.freeu( qBuf );
 *   
 *   }
 *
 * 
 *
 */
public class ByteBufferQueue 
{
  int bufSize = -1;  
  int nBufs = 0;     
  LinkedList queue = new LinkedList();
  LinkedList freeStack = new LinkedList();
  boolean eof = false;
  
  /**
   *  Construct a ByteBufferQueue. 
   *
   * @param  bufSize  Buffer size for each ByteBuffer in queue
   * @param  nBufs    Number of elements in queue (ByteBuffer 'pool')
   */
  public ByteBufferQueue( int bufSize, int nBufs )
  {
    this.bufSize = bufSize;
    this.nBufs = nBufs;

    // Allocate buffers and put them on free stack
    for( int n = 0 ; n < nBufs ; n++ )
    {
      ByteBuffer buf = ByteBuffer.allocate( bufSize );

      // TODO: Add support for direct buffers for additional optimization
      // ByteBuffer buf = ByteBuffer.allocateDirect( bufSize ) 
      
      freeStack.add( buf );
    }
    
  }

  /**
   *  put buffer on queue
   *
   *  @param timeoutMillisec - time to wait in milliseconds. if timeout is
   *  0, wait forever 
   *
   *  @return  true if object added to list, false if not
   *  
   */
  public synchronized void put( ByteBuffer buf )
  {
    buf.flip();  // Flip buffer so it is ready for reading

    //  System.out.println("---- put: Putting to queue");

    queue.addLast( buf );

    notify();  // notify reader that new queue packet is present
  }

  /**
   *  get object from queue
   *
   *  @param timeoutMillisec - time to wait in milliseconds. if timeout is
   *  0, wait forever 
   *
   *  @return  object, or null if timeout
   *  
   */
  public synchronized ByteBuffer get( long timeoutMillisec ) throws IOException
  {
    if( queue.isEmpty() )
    {
      //System.out.println("---- get: Waiting on empty queue");
      
      try
      {
        wait( timeoutMillisec );
      }
      catch( InterruptedException e )
      {
        //System.out.println("---- get: Interrupted exception " + e  );
        // Ignore for now
      }
      
      // System.out.println("---- get: Past waiting on empty queue");

      if( queue.isEmpty() )
      {
        // If the EOF flag is set, throw exception
        if( eof )
          throw new IOException("EOF encountered");

        return null;
      }
    }
    else
    {
      //System.out.println("get: non-empty queue upon entry");
    }
    
    return (ByteBuffer)queue.removeFirst();
  }

  public synchronized ByteBuffer alloc( long timeoutMillisec )
  {
    if( freeStack.isEmpty() )
    {
      //System.out.println("alloc: waiting for free pkt");

      try
      {
        wait( timeoutMillisec );
      }
      catch( InterruptedException e )  // thread interrupted
      {
        System.out.println("ByteBufferQueue: exception: " + e );
        // Ignore for now
      }

      if( freeStack.isEmpty() )
        return null;
    }

    return (ByteBuffer)freeStack.removeLast();
  }

  /**
   *   Free the buffer back to the queue
   */
  public synchronized void free( ByteBuffer buf )
  {
    buf.clear();  // reset buffer so it is ready for new data

    freeStack.add( buf );
    notify();   // Notify sender side that more queue space is now available
  }

  /**
   *   Clear the queue (free all outstanding queue buffers)
   */
  public synchronized void clear()
  {
    ByteBuffer buf;
    
    while( !queue.isEmpty() )
    {
      buf = (ByteBuffer)queue.removeFirst();
      freeStack.add( buf );
    }
    
    notify();   // Notify sender side that more queue space is now available

    //eof = false;
  }

  /** 
   *  Open the queue. This call is useful when re-using the same queue
   *  over and over (avoid constructor calls & garbage collection)
   *  It should be called following the constructor, and everytime
   *  one wants to re-use the queue following a close()
   */
  public synchronized void open()
  {
    eof = false;
    clear();
  }

  /** 
   *  Set the EOF state of the queue to true - reader side can then
   *  detect the 'closed queue' condition. Similar semantics to a 
   *  the 'Broken connection' condition observed by a client when 
   *  a server closes a socket
   */
  public synchronized void close()
  {
    eof = true;
  }
  

}

